This example demonstrates how to scatter values from a source array to
implement the equivalent of the numpy.repeat
function.
The method here uses one DMA channel per "source". However, this should be able to be accomplished using the DMA scatter/gather functionality and use at most 2 DMA channels
In [1]:
import numpy as np
num_sources = 4
src_array = np.arange(1, num_sources + 1)
samples_per_source = 5
print src_array
print np.repeat(src_array, samples_per_source)
In [2]:
import numpy as np
from teensy_minimal_rpc import SerialProxy
import teensy_minimal_rpc.DMA as dma
# Disconnect from existing proxy (if available)
try:
del proxy
except NameError:
pass
proxy = SerialProxy()
In [3]:
N = 512
proxy.free_all()
# Allocate source array
src_addr = proxy.mem_alloc(N)
# Allocate destination array
dst_addr = proxy.mem_alloc(N)
# Fill first 16 bytes of source array with the numbers 1-16
proxy.mem_cpy_host_to_device(src_addr, np.arange(1, 17, dtype='uint8'))
# Fill the destination array with all zeros
proxy.mem_fill_uint32(dst_addr, 0, N / 4)
print 'SOURCE: ', proxy.mem_cpy_device_to_host(src_addr, 16)
print 'TARGET: ', proxy.mem_cpy_device_to_host(dst_addr, 16)
In [4]:
# Create Transfer Control Descriptor configuration to match the settings
# shown in the example from the manual.
num_sources = 4
samples_per_source = 5
tcds = [dma.TCD(CITER_ELINKNO=dma.R_TCD_ITER_ELINKNO(ITER=1),
BITER_ELINKNO=dma.R_TCD_ITER_ELINKNO(ITER=1),
ATTR=dma.R_TCD_ATTR(SSIZE=dma.R_TCD_ATTR._8_BIT,
DSIZE=dma.R_TCD_ATTR._8_BIT),
NBYTES_MLNO=num_sources,
SADDR=int(src_addr),
SOFF=1,
SLAST=-num_sources,
DADDR=int(dst_addr + i),
DOFF=samples_per_source,
DLASTSGA=-samples_per_source * num_sources,
CSR=dma.R_TCD_CSR(START=0, DONE=False,
MAJORELINK=(True
if i < samples_per_source - 1
else False),
MAJORLINKCH=i + 1))
for i in xrange(samples_per_source)]
for i, tcd in enumerate(tcds):
proxy.update_dma_TCD(i, tcd)
In [5]:
print 'SOURCE: ', proxy.mem_cpy_device_to_host(src_addr, 16)
# Fill the destination array with all zeros
proxy.mem_fill_uint32(dst_addr, 0, N / 4)
print 'TARGET:'
print ' Before:', proxy.mem_cpy_device_to_host(dst_addr, 16)
# Apply TCD configuration to DMA channel 0 to conduct transfer.
proxy.update_dma_registers(dma.Registers(SSRT=0))
print ' After:', proxy.mem_cpy_device_to_host(dst_addr, num_sources * samples_per_source)
# Confirm that output array matches expected values.
assert((proxy.mem_cpy_device_to_host(dst_addr, num_sources * samples_per_source)
== np.repeat(src_array, samples_per_source)).all())